Setting Up of Spark:
tar xfvz spark-2.0.0-bin-hadoop2.7.tgz
cd spark-2.0.0-bin-hadoop2.7

#running spark example
./bin/run-example org.apache.spark.examples.SparkPi

#running spark locally
./bin/spark-shell --master local[2] 

#creating spark session
spark = SparkSession\
     .builder\
     .appName("recommendationEngine")\     
	 .config("spark.some.config.option", "some-value")\     
	 .getOrCreate()
	 
#creating RDD from an existing data object
coll = List("a", "b", "c", "d", "e")

rdd_from_coll = sc.parallelize(coll)

#creating RDD from a referenced file
rdd_from_Text_File = sc.textFile("testdata.txt")

#Data loading
data = sc.textFile("~/ml-100k/udata.csv")

#loaded data will be a spark RDD type, run the below command to findout the data type of data object.
 type(data)
<class 'pyspark.rdd.RDD'>
# total length of the data loaded is given by:
data.count()
100001

# To load the first record in the loaded data:
data.first()
'UserID\tItemId \tRating\tTimestamp'

# to check the first 5 rows of the data RDD, we use take() action method :
 data.take(5)
['UserID\tItemId \tRating\tTimestamp', '196\t242\t3\t881250949', '186\t302\t3\t891717742', '22\t377\t1\t878887116', '244\t51\t2\t880606923']

#extract the first row from the data RDD object
header = data.first()

#filter function
data = data.filter(lambda l:l!=header)
#Now let us check the count of data RDD object, it has reduced from 100001 to 100000.
data.count()
100000

data.first()
'196\t242\t3\t881250949'

from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
ratings = data.map(lambda l: l.split('\t'))\
	    .map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))

# check the data type of ratings object using type
type(ratings)
<class 'pyspark.rdd.PipelinedRDD'>

# check the first 5 records of the ratings pipelinedrdd object by running below code:
ratings.take(5)

[Rating(user=196, product=242, rating=3.0), Rating(user=186, product=302, rating=3.0), Rating(user=22, product=377, rating=1.0), Rating(user=244, product=51, rating=2.0), Rating(user=166, product=346, rating=1.0)]


#Data exploration
#total number of unique users:
df.select('user').distinct().show(5)

+----+
|user|
+----+
 |  26| 
 |  29| 
 | 474| 
 | 191| 
 |  65| 
 +----+
 only showing top 5 rows 

# total number of unique users
df.select('user').distinct().count() 943   

#total number of unique items
df.select('product').distinct().count() 1682

#display first 5 unique products:
df.select('product').distinct().show(5)
+-------+
 |product|
 +-------+ 
 |    474| 
 |     29|
 |     26| 
 |    964| 
 |   1677| 
 +-------+
 only showing top 5 rows 

# number of rated products by each user:
 df.groupBy("user").count().take(5) 
[Row(user=26, count=107), Row(user=29, count=34), Row(user=474, count=327), Row(user=191, count=27), Row(user=65, count=80)] Above results explains that User 26 has rated 107 movies, user 29 has rated 34 movies.

# of records for each rating type
 df.groupBy("rating").count().show() 
+------+-----+ 
|rating|count| 
+------+-----+ |
   1.0| 6110| 
   |   4.0|34174|
   |   3.0|27145|
   |   2.0|11370|
   |   5.0|21201|
   +------+-----+ 
   
 import numpy as np import matplotlib.pyplot as plt 
n_groups = 5 
x = df.groupBy("rating").count().select('count') 
xx = x.rdd.flatMap(lambda x: x).collect() fig, ax = plt.subplots() 
index = np.arange(n_groups) 
bar_width = 1 
opacity = 0.4 
rects1 = plt.bar(index, xx, bar_width,alpha=opacity,color='b', label='ratings')

 plt.xlabel('ratings')
 plt.ylabel('Counts')
 plt.title('Distribution of ratings')
 plt.xticks(index + bar_width, ('1.0', '2.0', '3.0', '4.0', '5.0'))
 plt.legend()
 plt.tight_layout() plt.show()




#statistics of ratings per user
df.groupBy("UserID").count().select('count').describe().show() 
+-------+------------------+   
|summary| count| 
+-------+------------------+ 
|count|943| 
|mean|106.04453870625663|
 |stddev|100.93174276633498| 
 | min|20| 
 |max|737|
 +-------+------------------+ 

# individual counts of ratings per user
 df.stat.crosstab("UserID", "Rating").show() 
+-------------+---+---+---+---+---+
|UserID_Rating|1.0|2.0|3.0|4.0|5.0| 
+-------------+---+---+---+---+---+ |
          645|  2|  2| 29| 55| 34|
		  |          892|  2| 13| 40| 99| 72| 
		  |           69|  2|  3| 21| 16| 23| 
		  |          809|  2|  2|  6|  5|  5| 
		  |          629|  1|  8| 24| 35| 53| 
		  |          365|  5|  9| 12| 23|  9|
		  |          138|  0|  1|  3| 28| 19|
		  |          760|  4|  7| 11| 13|  6| 
		  |          101|  3| 19| 28| 16|  1| 
		  |          479| 24| 14| 48| 85| 31| 
		  |          347| 20| 25| 37| 55| 62| 
		  |          846| 10| 46| 89|154|106| 
		  |          909|  0|  0|  5|  7| 14| 
		  |          333|  1|  1|  5| 13|  6| 
		  |          628|  0|  1|  1|  3| 22| 
		  |          249|  1|  5| 31| 63| 61| 
		  |          893|  1|  5| 28| 18|  7|
		|          468|  0|  9| 31| 55| 48|
		|          234| 14|103|205|126| 32|
		+-------------+---+---+---+---+---+ 
		only showing top 20 rows 
		
#Average rating given by each user:
 df.groupBy('UserID').agg({'Rating': 'mean'}).take(5) 
[Row(UserID=148, avg(Rating)=4.0), Row(UserID=463, avg(Rating)=2.8646616541353382), Row(UserID=471, avg(Rating)=3.3870967741935485), Row(UserID=496, avg(Rating)=3.0310077519379846), Row(UserID=833, avg(Rating)=3.056179775280899)] 

#Average rating per movie:
 df.groupBy('ItemId ').agg({'Rating': 'mean'}).take(5) 
[Row(ItemId =496, avg(Rating)=4.121212121212121), Row(ItemId =471, avg(Rating)=3.6108597285067874), Row(ItemId =463, avg(Rating)=3.859154929577465), Row(ItemId =148, avg(Rating)=3.203125), Row(ItemId =1342, avg(Rating)=2.5)] 

Building basic recommendation engine:
#divide the original data into training and test datasets randomly as below using randomSplit() method:
(training, test) = ratings.randomSplit([0.8, 0.2])

# counting the number of instances in the training dataset
training.count()
80154

#counting the number of instances in the test set.
test.count()
19846
# Build the recommendation model using Alternating Least Squares
#setting rank and maxIter parameters
rank = 10
numIterations = 10

#calling train() method with training data, rank, maxIter params.
model = ALS.train(training, rank, numIterations)
16/10/04 11:01:34 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
16/10/04 11:01:34 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
16/10/04 11:01:34 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
16/10/04 11:01:34 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
16/10/04 11:01:37 WARN Executor: 1 block locks were not released by TID = 122:
[rdd_221_0]
16/10/04 11:01:37 WARN Executor: 1 block locks were not released by TID = 123:
[rdd_222_0]
16/10/04 11:01:37 WARN Executor: 1 block locks were not released by TID = 124:
[rdd_221_0]
16/10/04 11:01:37 WARN Executor: 1 block locks were not released by TID = 125:
[rdd_222_0]

#checking the model as below, we observe that Matrixfactorizationmodel object is created.

 model
<pyspark.mllib.recommendation.MatrixFactorizationModel object at 0x7f24c34dd908>

Make predictions:

#below code extracts each row in the test data and extracts userID, ItemID and puts it in testdata pipelinedRDD object.
testdata = test.map(lambda p: (p[0], p[1]))

type(testdata)
<class 'pyspark.rdd.PipelinedRDD'>


#below code shows the original test data sample.
 test.take(5)

[Rating(user=119, product=392, rating=4.0), Rating(user=38, product=95, rating=5.0), Rating(user=63, product=277, rating=4.0), Rating(user=160, product=234, rating=5.0), Rating(user=225, product=193, rating=4.0)]


#below code displays the formated data required for making predictions:
testdata.take(5)

[(119, 392), (38, 95), (63, 277), (160, 234), (225, 193)]

Predictions:

#This method is used when we want to make predictions for a combination of user and item
pred_ind = model.predict(119, 392)

#we can observe below that the prediction value for a user 119 and movie 392 is 4.3926091845289275, just see above the original value for the same combination in test data
pred_ind

4.3926091845289275
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))

#below code to check the data type
type(predictions)
<class 'pyspark.rdd.PipelinedRDD'>

#below code displays the first five predictions
 predictions.take(5)

[((268, 68), 3.197299431949281), ((200, 68), 3.6296857016488357), ((916, 68), 3.070451877410571), ((648, 68), 2.165520614428771), ((640, 68), 3.821666263132798)]


UCBF:
#below code shows that recommendations are generated for all the 943 users.
recommedItemsToUsers.count()

943   

#Let us see the  recommendations to the first two users: 96, 
recommedItemsToUsers.take(2)

[
(96, (Rating(user=96, product=1159, rating=11.251653489172302), Rating(user=96, product=962, rating=11.1500279633824), Rating(user=96, product=534, rating=10.527262244626867), Rating(user=96, product=916, rating=10.066351313580977), Rating(user=96, product=390, rating=9.976996795233937), Rating(user=96, product=901, rating=9.564128162876036), Rating(user=96, product=1311, rating=9.13860044421153), Rating(user=96, product=1059, rating=9.081563794413025), Rating(user=96, product=1178, rating=9.028685203289745), Rating(user=96, product=968, rating=8.844312806737918)
)),
 (784, (Rating(user=784, product=904, rating=5.975314993539809), Rating(user=784, product=1195, rating=5.888552423210881), Rating(user=784, product=1169, rating=5.649927493462845), Rating(user=784, product=1446, rating=5.476279163198376), Rating(user=784, product=1019, rating=5.303140289874016), Rating(user=784, product=1242, rating=5.267858336331315), Rating(user=784, product=1086, rating=5.264190584020031), Rating(user=784, product=1311, rating=5.248377920702441), Rating(user=784, product=816, rating=5.173286729120303), Rating(user=784, product=1024, rating=5.1253425029498985)
))
]


Model evaluation:

# create a ratesAndPreds object by joining original ratings and predictions
ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)


#cal
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()

[Stage 860:>                                                        (0 + 4) / 6]


Mean Squared Error = 1.1925845065690288  

from math import sqrt

rmse = sqrt(MSE)
rmse
1.092055175606539
type(ratings)
 <class 'pyspark.rdd.PipelinedRDD'>

#sql Context object is created when starting the spark session using pyspark
sqlContext
<pyspark.sql.context.SQLContext object at 0x7f24c94f7d68>


#creating a dataframe object from ratings rdd object as below:
df = sqlContext.createDataFrame(ratings)

type(df)

<class 'pyspark.sql.dataframe.DataFrame'>

#display first 20 records of dataframe object
df.show()

+----+-------+------+
|user|product|rating|
+----+-------+------+

| 196|    242|   3.0|
| 186|    302|   3.0|
|  22|    377|   1.0|
| 244|     51|   2.0|
| 166|    346|   1.0|
| 298|    474|   4.0|
| 115|    265|   2.0|
| 253|    465|   5.0|
| 305|    451|   3.0|
|   6|     86|   3.0|
|  62|    257|   2.0|
| 286|   1014|   5.0|
| 200|    222|   5.0|
| 210|     40|   3.0|
| 224|     29|   3.0|
| 303|    785|   3.0|
| 122|    387|   5.0|
| 194|    274|   2.0|
| 291|   1042|   4.0|
| 234|   1184|   2.0|
+----+-------+------+
only showing top 20 rows


# creating random samples of training set and test set using randomSplit() method.
(training, test) = df.randomSplit([0.8, 0.2])


#load modules required for running parameter tuningmodel
from pyspark.ml.recommendation import ALS
# estimator objects, as stated earlier, estimator takes algorithm or pipeline object as input. Let us build one pipeline object as below:
#calling ALS algorithm
als = ALS(userCol="user", itemCol="product", ratingCol="rating")
als

ALS_45108d6e011beae88f4c

#checking the type of als object
type(als)
<class 'pyspark.ml.recommendation.ALS'>

# let us see how the default parameters are set for ALS model
als.explainParams()
"alpha: alpha for implicit preference (default: 1.0)\ncheckpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. (default: 10)\nfinalStorageLevel: StorageLevel for ALS model factors. (default: MEMORY_AND_DISK)\nimplicitPrefs: whether to use implicit preference (default: False)\nintermediateStorageLevel: StorageLevel for intermediate datasets. Cannot be 'NONE'. (default: MEMORY_AND_DISK)\nitemCol: column name for item ids. Ids must be within the integer value range. (default: item, current: ItemId )\nmaxIter: max number of iterations (>= 0). (default: 10)\nnonnegative: whether to use nonnegative constraint for least squares (default: False)\nnumItemBlocks: number of item blocks (default: 10)\nnumUserBlocks: number of user blocks (default: 10)\npredictionCol: prediction column name. (default: prediction)\nrank: rank of the factorization (default: 10)\nratingCol: column name for ratings (default: rating, current: Rating)\nregParam: regularization parameter (>= 0). (default: 0.1)\nseed: random seed. (default: -1517157561977538513)\nuserCol: column name for user ids. Ids must be within the integer value range. (default: user, current: UserID)"

#create pipeline object and setting the created als model as a stage in the pipeline
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[als])
 type(pipeline)
<class 'pyspark.ml.pipeline.Pipeline'>
#loading cross-validation and param grid builder modules to create range of parameters.
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

paramMapExplicit = ParamGridBuilder() \
                    .addGrid(als.rank, [8, 12]) \
                    .addGrid(als.maxIter, [10, 15]) \
                    .addGrid(als.regParam, [1.0, 10.0]) \
                    .build()


#loading the RegressionEvaluator model
from pyspark.ml.evaluation import RegressionEvaluator

#calling RegressionEvaluator() method with evaluation metric set to rmse and evaluation column set to Rating
evaluatorR = RegressionEvaluator(metricName="rmse", labelCol="rating")

cvExplicit = CrossValidator(estimator=als, estimatorParamMaps=paramMap, evaluator=evaluatorR,numFolds=5)

#runnig the model using fit() method

cvModel = cvExplicit.fit(training)

[Stage 897:============================>                           (5 + 4) / 10]
[Stage 938:==================================================>     (9 + 1) / 10]
[Stage 1004:>(0 + 4) / 10][Stage 1005:> (0 + 0) / 2][Stage 1007:>(0 + 0) / 10]  
[Stage 1008:>                                                     (3 + 4) / 200]


preds = cvModel.bestModel.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",predictionCol="prediction")
rmse = evaluator.evaluate(pred)
print("Root-mean-square error = " + str(rmse))
 rmse                                                                        

0.924617823674082
